package com.nulldev.util.internal.backport.httpclient_rw;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import com.nulldev.util.internal.backport.concurrency9.Objects;
import com.nulldev.util.internal.backport.concurrency9.concurrent.CompletionStage;
import com.nulldev.util.internal.backport.concurrency9.concurrent.Flow;
import com.nulldev.util.internal.backport.httpclient_rw.HttpResponse.BodySubscriber;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Demand;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.SequentialScheduler;

/**
 * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given
 * amount ( in bytes ) of a publisher's data before pushing it to a downstream
 * subscriber.
 */
public class BufferingSubscriber<T> implements BodySubscriber<T> {
	/** The downstream consumer of the data. */
	private final BodySubscriber<T> downstreamSubscriber;
	/** The amount of data to be accumulate before pushing downstream. */
	private final int bufferSize;

	/** The subscription, created lazily. */
	private volatile Flow.Subscription subscription;
	/** The downstream subscription, created lazily. */
	private volatile DownstreamSubscription downstreamSubscription;

	/** Must be held when accessing the internal buffers. */
	private final Object buffersLock = new Object();
	/** The internal buffers holding the buffered data. */
	private final LinkedBlockingQueue<ByteBuffer> internalBuffers;
	/** The actual accumulated remaining bytes in internalBuffers. */
	private int accumulatedBytes;

	/** Holds the Throwable from upstream's onError. */
	private volatile Throwable throwable;

	/**
	 * State of the buffering subscriber: 1) [UNSUBSCRIBED] when initially created
	 * 2) [ACTIVE] when subscribed and can receive data 3) [ERROR | CANCELLED |
	 * COMPLETE] (terminal state)
	 */
	static final int UNSUBSCRIBED = 0x01;
	static final int ACTIVE = 0x02;
	static final int ERROR = 0x04;
	static final int CANCELLED = 0x08;
	static final int COMPLETE = 0x10;

	private volatile int state;

	public BufferingSubscriber(BodySubscriber<T> downstreamSubscriber, int bufferSize) {
		this.downstreamSubscriber = Objects.requireNonNull(downstreamSubscriber);
		this.bufferSize = bufferSize;
		this.internalBuffers = new LinkedBlockingQueue<ByteBuffer>();
		state = UNSUBSCRIBED;
	}

	/** Returns the number of bytes remaining in the given buffers. */
	private static final long remaining(List<ByteBuffer> buffers) {
		return buffers.stream().mapToLong(ByteBuffer::remaining).sum();
	}

	/**
	 * Tells whether, or not, there is at least a sufficient number of bytes
	 * accumulated in the internal buffers. If the subscriber is COMPLETE, and has
	 * some buffered data, then there is always enough ( to pass downstream ).
	 */
	private final boolean hasEnoughAccumulatedBytes() {
		assert Thread.holdsLock(buffersLock);
		return accumulatedBytes >= bufferSize || (state == COMPLETE && accumulatedBytes > 0);
	}

	/**
	 * Returns a new, unmodifiable, List<ByteBuffer> containing exactly the amount
	 * of data as required before pushing downstream. The amount of data may be less
	 * than required ( bufferSize ), in the case where the subscriber is COMPLETE.
	 */
	private List<ByteBuffer> fromInternalBuffers() {
		assert Thread.holdsLock(buffersLock);
		int leftToFill = bufferSize;
		int state = this.state;
		assert (state == ACTIVE || state == CANCELLED) ? accumulatedBytes >= leftToFill : true;
		final List<ByteBuffer> dsts = new ArrayList<ByteBuffer>();

		final Iterator<ByteBuffer> itr = internalBuffers.iterator();
		while (itr.hasNext()) {
			ByteBuffer b = itr.next();
			if (b.remaining() <= leftToFill) {
				itr.remove();
				if (b.position() != 0)
					b = b.slice(); // ensure position = 0 when propagated
				dsts.add(b);
				leftToFill -= b.remaining();
				accumulatedBytes -= b.remaining();
				if (leftToFill == 0)
					break;
			} else {
				int prevLimit = b.limit();
				b.limit(b.position() + leftToFill);
				ByteBuffer slice = b.slice();
				dsts.add(slice);
				b.limit(prevLimit);
				b.position(b.position() + leftToFill);
				accumulatedBytes -= leftToFill;
				leftToFill = 0;
				break;
			}
		}
		assert (state == ACTIVE || state == CANCELLED) ? leftToFill == 0 : state == COMPLETE;
		assert (state == ACTIVE || state == CANCELLED) ? remaining(dsts) == bufferSize : state == COMPLETE;
		assert accumulatedBytes >= 0;
		assert dsts.stream().noneMatch(b -> b.position() != 0);
		return Collections.unmodifiableList(dsts);
	}

	/** Subscription that is passed to the downstream subscriber. */
	private class DownstreamSubscription implements Flow.Subscription {
		private final AtomicBoolean cancelled = new AtomicBoolean(); // false
		private final Demand demand = new Demand();
		private volatile boolean illegalArg;

		@Override
		public void request(long n) {
			if (cancelled.get() || illegalArg) {
				return;
			}
			if (n <= 0L) {
				// pass the "bad" value upstream so the Publisher can deal with
				// it appropriately, i.e. invoke onError
				illegalArg = true;
				subscription.request(n);
				return;
			}

			demand.increase(n);

			pushDemanded();
		}

		private final SequentialScheduler pushDemandedScheduler = new SequentialScheduler(new PushDemandedTask());

		void pushDemanded() {
			if (cancelled.get())
				return;
			pushDemandedScheduler.runOrSchedule();
		}

		class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask {
			@Override
			public void run() {
				try {
					Throwable t = throwable;
					if (t != null) {
						pushDemandedScheduler.stop(); // stop the demand scheduler
						downstreamSubscriber.onError(t);
						return;
					}

					while (true) {
						List<ByteBuffer> item;
						synchronized (buffersLock) {
							if (cancelled.get())
								return;
							if (!hasEnoughAccumulatedBytes())
								break;
							if (!demand.tryDecrement())
								break;
							item = fromInternalBuffers();
						}
						assert item != null;

						downstreamSubscriber.onNext(item);
					}
					if (cancelled.get())
						return;

					// complete only if all data consumed
					boolean complete;
					synchronized (buffersLock) {
						complete = state == COMPLETE && internalBuffers.isEmpty();
					}
					if (complete) {
						assert internalBuffers.isEmpty();
						pushDemandedScheduler.stop(); // stop the demand scheduler
						downstreamSubscriber.onComplete();
						return;
					}
				} catch (Throwable t) {
					cancel(); // cancel if there is any error
					throw t;
				}

				boolean requestMore = false;
				synchronized (buffersLock) {
					if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) {
						// request more upstream data
						requestMore = true;
					}
				}
				if (requestMore)
					subscription.request(1);
			}
		}

		@Override
		public void cancel() {
			if (cancelled.compareAndSet(false, true)) /* FIXME: compareAndExchange */
				return; // already cancelled

			state = CANCELLED; // set CANCELLED state of upstream subscriber
			subscription.cancel(); // cancel upstream subscription
			pushDemandedScheduler.stop(); // stop the demand scheduler
		}
	}

	@Override
	public void onSubscribe(Flow.Subscription subscription) {
		Objects.requireNonNull(subscription);
		if (this.subscription != null) {
			subscription.cancel();
			return;
		}

		int s = this.state;
		assert s == UNSUBSCRIBED;
		state = ACTIVE;
		this.subscription = subscription;
		downstreamSubscription = new DownstreamSubscription();
		downstreamSubscriber.onSubscribe(downstreamSubscription);
	}

	@Override
	public void onNext(List<ByteBuffer> item) {
		Objects.requireNonNull(item);

		int s = state;
		if (s == CANCELLED)
			return;

		if (s != ACTIVE)
			throw new InternalError("onNext on inactive subscriber");

		synchronized (buffersLock) {
			internalBuffers.addAll(item);
			accumulatedBytes += remaining(item);
		}

		downstreamSubscription.pushDemanded();
	}

	@Override
	public void onError(Throwable incomingThrowable) {
		Objects.requireNonNull(incomingThrowable);
		int s = state;
		assert s == ACTIVE : "Expected ACTIVE, got:" + s;
		state = ERROR;
		Throwable t = this.throwable;
		assert t == null : "Expected null, got:" + t;
		this.throwable = incomingThrowable;
		downstreamSubscription.pushDemanded();
	}

	@Override
	public void onComplete() {
		int s = state;
		assert s == ACTIVE : "Expected ACTIVE, got:" + s;
		state = COMPLETE;
		downstreamSubscription.pushDemanded();
	}

	@Override
	public CompletionStage<T> getBody() {
		return downstreamSubscriber.getBody();
	}
}
